from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

# 6 9月份销售量前十的商家
if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("xixi2552").\
        getOrCreate()
    sc = spark.sparkContext
    # 1. 读取数据集
    schema = StructType().add("use_ID", StringType(), nullable=True). \
        add("sel_ID", StringType(), nullable=True). \
        add("ite_ID", StringType(), nullable=True). \
        add("cat_ID", StringType(), nullable=True). \
        add("act_ID", StringType(), nullable=True). \
        add("time", StringType(), nullable=True)
    df = spark.read.format("csv"). \
        option("sep", ","). \
        option("header", True). \
        option("encoding", "utf-8"). \
        schema(schema=schema). \
        load("hdfs://node1:8020/input/taobao.csv")
    df.createTempView("shop")
    spark.sql("""
    select sel_ID ,sum(act_ID) as s from shop where  time between 20150901 and 20150930  group by sel_ID  order by s desc limit 10;
    """).write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true"). \
        option("dbtable", "Sep_top"). \
        option("user", "root"). \
        option("password", "123456"). \
        save()